persist() or cache() methods to mark an RDD to be persisted. If any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it (fault-tolerant).PySpark.SparkContextaccumulator((value, accum_param=None)accum = sc.accumulator(0)
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
accum.value
10
parallelize(c, numSlices=None)xrange is recommended if the input represents a range for performance.sc.parallelize([1, 2, 3, 4, 5], 4).glom().collect()
[[1], [2], [3], [4, 5]]
sc.parallelize(xrange(0, 6, 2), 3).glom().collect()
[[0], [2], [4]]
textFile(name, minPartitions=None, use_unicode=True)textline = sc.textFile('/poem.txt') # from HDFS
textline.collect()
[u'There is Another Sky',
u'',
u'Emily Dickinson',
u'',
u'There is another sky,',
u'Ever serene and fair,',
u'And there is another sunshine,',
u'Though it be darkness there;',
u'Never mind faded forests, Austin,',
u'Never mind silent fields -',
u'Here is a little forest,',
u'Whose leaf is ever green;',
u'Here is a brighter garden,',
u'Where not a frost has been;',
u'In its unfading flowers',
u'I hear the bright bee hum:',
u'Prithee, my brother,',
u'Into my garden come!']
textline = sc.textFile('/poem.txt', use_unicode = False)
textline.collect()
['There is Another Sky',
'',
'Emily Dickinson',
'',
'There is another sky,',
'Ever serene and fair,',
'And there is another sunshine,',
'Though it be darkness there;',
'Never mind faded forests, Austin,',
'Never mind silent fields -',
'Here is a little forest,',
'Whose leaf is ever green;',
'Here is a brighter garden,',
'Where not a frost has been;',
'In its unfading flowers',
'I hear the bright bee hum:',
'Prithee, my brother,',
'Into my garden come!']
Short functions can be passed to RDD methods using Python’s lambda syntax.
logData = sc.textFile(logFile).cache()
errors = logData.filter(lambda line: "ERROR" in line)Python functions defined with the def keyword can be passed as well. It is useful for longer functions that can’t be expressed using the lambda syntax.
def has_error(line):
return "ERROR" in line
errors = logData.filter(has_error)PySpark.RDDcogroup(other, numPartitions=None)x = sc.parallelize([("a", 10), ("b", 20)])
y = sc.parallelize([("b", 30)])
[(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
[('a', ([10], [])), ('b', ([20], [30]))]
collect()sc.range(5).collect()
[0, 1, 2, 3, 4]
count()sc.parallelize([1, 2, 3, 4, 5, 6]).count()
6
countByKey()rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("c", 1)])
sorted(rdd.countByKey().items())
[('a', 2), ('b', 1), ('c', 1)]
countByValue()rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("c", 1)])
sorted(rdd.countByValue().items())
[(('a', 1), 2), (('b', 1), 1), (('c', 1), 1)]
distinct(numPartitions=None)sorted(sc.parallelize([1, 1, 1, 2, 3, 4, 3]).distinct().collect())
[1, 2, 3, 4]
filter(f)rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
rdd1.filter(lambda x: x % 2 == 0).collect()
[2, 4, 6, 8, 10]
first()sc.parallelize([10, 20, 30]).first()
10
map(f, preservesPartitioning=False)rdd2 = sc.parallelize(["1", "2", "3"])
sorted(rdd2.map(lambda x: x*10).collect())
['1111111111', '2222222222', '3333333333']
flatMap(f, preservesPartitioning=False)rdd = sc.parallelize([5, 6, 7])
sorted(rdd.flatMap(lambda x: range(1, x)).collect())
[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 6]
sorted(rdd.flatMap(lambda x: [(x, x+1), (x+1, x)]).collect())
[(5, 6), (6, 5), (6, 7), (7, 6), (7, 8), (8, 7)]
foreach(f)accum = sc.accumulator(0)
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
accum.value
10
glom()rdd = sc.parallelize([1, 2, 3, 4], 2)
sorted(rdd.glom().collect())
[[1, 2], [3, 4]]
histogram(buckets)rdd = sc.parallelize([5, 18])
rdd.histogram([1, 10, 20, 50])
([1, 10, 20, 50], [1, 1, 0])
reduce(f)
intersection(other RDD)
union(other RDD)
leftOuterJoin(other, numPartitions=None)
rightOuterJoin(other, numPartitions=None)
max(key=None)
min(key=None)
mean()
stdev()
stats()
sum()
Take(N)
takeOrdered(N, key=None)
top(N, key=None)
zip(other RDD)
zipWithIndex()
reduceByKey(func, numPartitions=None)
textFile() and reduceByKey()